Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23877][SQL]: Use filter predicates to prune partitions in metadata-only queries #20988

Closed

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Apr 5, 2018

What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.

How was this patch tested?

Existing tests for metadata-only queries.

rdblue added 2 commits April 5, 2018 16:22
The LocalRelation created for partition data for metadata-only queries
may be a stream produced by listing directories. If the stream is large,
serializing the LocalRelation to executors results in a stack overflow
because the stream is a recursive structure of (head, rest-of-stream).
@SparkQA
Copy link

SparkQA commented Apr 6, 2018

Test build #88959 has finished for PR 20988 at commit 2345896.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 13, 2018

@cloud-fan or @gatorsmile, could you review this?

@cloud-fan
Copy link
Contributor

can we add a test? We can use HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() to check if this patch can really reduce the number of partitions being fetched.


case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
unapply(child).flatMap { case (partAttrs, filters, relation) =>
if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about Filter(p > 1, Project(a, p, Table(a, b, p, partitioned by p)))? p > 1 should also be a partition filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose something top-down like

def getPartitionedRelation(
    plan: LogicalPlan,
    predicates: Seq[Expression]): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = {
  plan match {
    case Filter(condition, child) if condition.deterministic =>
      getPartitionedRelation(child, predicates ++ splitConjunctivePredicates(condition))
   
    case Project(projectList, child) if projectList.forall(_.deterministic) =>
      getPartitionedRelation(child, predicates.filter(_.references.subsetOf(child.outputSet)))
    
    case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if fsRelation.partitionSchema.nonEmpty =>
      val partAttrs = ...
      val partitionFilters = predicates.filter(_.references.subsetOf(partAttrs))
      Some(...)

    case _ => None
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, that is basically how this works already. Each matched node calls unapply(child) to get the result from the child node, then it adds the current node's conditions to that result. Using unapply instead of getPartitionedRelation makes this work in the matching rule:

  case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, filters, relation)) =>

@rdblue
Copy link
Contributor Author

rdblue commented Apr 18, 2018

@cloud-fan, I've added the test. Thanks for letting me know about HiveCatalogMetrics, that's exactly what I needed.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89535 has finished for PR 20988 at commit 6e0685e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case f @ Filter(condition, child) if condition.deterministic =>
unapply(child).flatMap { case (partAttrs, filters, relation) =>
if (f.references.subsetOf(partAttrs)) {
Some((partAttrs, splitConjunctivePredicates(condition) ++ filters, relation))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bug here. Think about Filter(x > 1, Project(p + 1 as x, Table(a, p, partitioned by p))), we will mistakenly report x > 1 as partition predicates and use it to list partitions and fail.

I think we should use PhysicalOperation here, which can help us to substitute the attributes in filter.

Copy link
Contributor Author

@rdblue rdblue Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've added that as a test case and updated the PartitionedRelation code to keep track of both original partition attributes -- that the filter needs -- and the top-most node's output that is used by the rule. One thing to note: the optimizer usually pushes Filter below Project` by the time this runs, so it is difficult to even construct this test case.

For using PhysicalOperation instead of PartitionedRelation, I don't see a compelling reason for such an invasive change. This currently adds a couple of results to unapply and keeps mostly the same logic. PhysicalOperation would lose the check that the references are a subset of the partition attributes and be a lot larger change. If you think this should be refactored, lets talk about that separately to understand the motivation for the change.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89588 has finished for PR 20988 at commit 93cf217.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in b3fde5a Apr 20, 2018
ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 23, 2018
…ndling of Project and Filter over partitioned relation

## What changes were proposed in this pull request?

A followup of apache#20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

## How was this patch tested?

existing test

Author: Wenchen Fan <[email protected]>

Closes apache#21111 from cloud-fan/refactor.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
…ata-only queries

## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

Author: Ryan Blue <[email protected]>

Closes apache#20988 from rdblue/SPARK-23877-metadata-only-push-filters.

(cherry picked from commit b3fde5a)
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
…ndling of Project and Filter over partitioned relation

A followup of apache#20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

existing test

Author: Wenchen Fan <[email protected]>

Closes apache#21111 from cloud-fan/refactor.

(cherry picked from commit f70f46d)

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
…ata-only queries

## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

Author: Ryan Blue <[email protected]>

Closes apache#20988 from rdblue/SPARK-23877-metadata-only-push-filters.

(cherry picked from commit b3fde5a)
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
…ndling of Project and Filter over partitioned relation

A followup of apache#20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

existing test

Author: Wenchen Fan <[email protected]>

Closes apache#21111 from cloud-fan/refactor.

(cherry picked from commit f70f46d)

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants